|
Python/FAQ/Обработка файлов
Материал из Wiki.crossplatform.ru
[править] Introduction
#-----------------------------
for line in DATAFILE:
line = line.rstrip()
size = len(line)
print size # output size of line
#-----------------------------
for line in datafile:
print length(line.rstrip()) # output size of line
#-----------------------------
lines = datafile.readlines()
#-----------------------------
whole_file = myfile.read()
#-----------------------------
## No direct equivalent in Python
#% perl -040 -e '$word = <>; print "First word is $word\n";'
#-----------------------------
## No direct equivalent in Python
#% perl -ne 'BEGIN { $/="%%\n" } chomp; print if /Unix/i' fortune.dat
#-----------------------------
print>>myfile, "One", "two", "three" # "One two three"
print "Baa baa black sheep." # Sent to default output file
#-----------------------------
buffer = myfile.read(4096)
rv = len(buffer)
#-----------------------------
myfile.truncate(length)
open("/tmp/%d.pid" % os.getpid(), "a").truncate(length)
#-----------------------------
pos = myfile.tell()
print "I'm", pos, "bytes from the start of DATAFILE."
#-----------------------------
logfile.seek(0, 2) # Seek to the end
datafile.seek(pos) # Seek to a given byte
outfile.seek(-20, 1) # Seek back 20 bytes
#-----------------------------
written = os.write(datafile.fileno(), mystr)
if written != len(mystr):
warnings.warn("only read %s bytes, not %s" % (written, len(mystr)))
#-----------------------------
pos = os.lseek(myfile.fileno(), 0, 1) # don't change position
#-----------------------------
[править] Reading Lines with Continuation Characters
def ContReader(infile):
lines = []
for line in infile:
line = line.rstrip()
if line.endswith("\\"):
lines.append(line[:-1])
continue
lines.append(line)
yield "".join(lines)
lines = []
if lines:
yield "".join(lines)
for line in ContReader(datafile):
pass # process full record in 'line' here
[править] Counting Lines (or Paragraphs or Records) in a File
import os
count = int(os.popen("wc -l < " + filename).read())
#-----------------------------
for count, line in enumerate(open(filename)):
pass
count += 1 # indexing is zero based
#-----------------------------
myfile = open(filename)
count = 0
for line in myfile:
count += 1
# 'count' now holds the number of lines read
#-----------------------------
myfile = open(filename)
count = 0
while True:
line = myfile.readline()
if not line:
break
count += 1
#-----------------------------
count = 0
while True:
s = myfile.read(2**16)
count += s.count("\n")
#-----------------------------
for line, count in zip(open(filename), xrange(1, sys.maxint)):
pass
# 'count' now holds the number of lines read
#-----------------------------
import fileinput
fi = fileinput.FileInput(filename)
while fi.readline(): pass
count = fi.lineno()
#-----------------------------
def SepReader(infile, sep = "\n\n"):
text = infile.read(10000)
if not text:
return
while True:
fields = text.split(sep)
for field in fields[:-1]:
yield field
text = fields[-1]
new_text = infile.read(10000)
if not new_text:
yield text
break
text += new_text
para_count = 0
for para in SepReader(open(filename)):
para_count += 1
# FIXME: For my test case (Python-pre2.2 README from CVS) this
# returns 175 paragraphs while Perl returns 174.
#-----------------------------
[править] Processing Every Word in a File
for line in sys.stdin:
for word in line.split():
pass # do something with 'chunk'
#-----------------------------
pat = re.compile(r"(\w[\w'-]*)")
for line in sys.stdin:
pos = 0
while True:
match = pat.search(line, pos)
if not match:
break
pos = match.end(1)
# do something with match.group(1)
# EXPERIMENTAL in the sre implementation but
# likely to be included in future (post-2.2) releases.
pat = re.compile(r"(\w[\w'-]*)")
for line in sys.stdin:
scanner = pat.scanner(line)
while True:
match = scanner.search()
if not match:
break
# do something with match.group(1)
#-----------------------------
# Make a word frequency count
import fileinput, re
pat = re.compile(r"(\w[\w'-]*)")
seen = {}
for line in fileinput.input():
pos = 0
while True:
match = pat.search(line, pos)
if not match:
break
pos = match.end(1)
text = match.group(1).lower()
seen[text] = seen.get(text, 0) + 1
# output dict in a descending numeric sort of its values
for text, count in sorted(seen.items, key=lambda item: item[1]):
print "%5d %s" % (count, text)
#-----------------------------
# Line frequency count
import fileinput, sys
seen = {}
for line in fileinput.input():
text = line.lower()
seen[text] = seen.get(text, 0) + 1
for text, count in sorted(seen.items, key=lambda item: item[1]):
sys.stdout.write("%5d %s" % (count, text))
#-----------------------------
[править] Reading a File Backwards by Line or Paragraph
lines = myfile.readlines()
while lines:
line = lines.pop()
# do something with 'line'
#-----------------------------
for line in reversed(myfile):
pass # do something with line
#-----------------------------
for i in range(len(lines)):
line = lines[-i]
#-----------------------------
for paragraph in sorted(SepReader(infile)):
pass # do something
#-----------------------------
[править] Trailing a Growing File
import time
while True:
for line in infile:
pass # do something with the line
time.sleep(SOMETIME)
infile.seek(0, 1)
#-----------------------------
import time
naptime = 1
logfile = open("/tmp/logfile")
while True:
for line in logfile:
print line.rstrip()
time.sleep(naptime)
infile.seek(0, 1)
#-----------------------------
while True:
curpos = logfile.tell()
while True:
line = logfile.readline()
if not line:
break
curpos = logfile.tell()
sleep(naptime)
logfile.seek(curpos, 0) # seek to where we had been
#-----------------------------
import os
if os.stat(LOGFILENAME).st_nlink == 0:
raise SystemExit
#-----------------------------
[править] Picking a Random Line from a File
import random, fileinput
text = None
for line in fileinput.input():
if random.randrange(fileinput.lineno()) == 0:
text = line
# 'text' is the random line
#-----------------------------
# XXX is the perl code correct? Where is the fortunes file opened?
import sys
adage = None
for i, rec in enumerate(SepReader(open("/usr/share/games/fortunes"), "%\n")):
if random.randrange(i+1) == 0:
adage = rec
print adage
#-----------------------------
[править] Randomizing All Lines
import random
lines = data.readlines()
random.shuffle(lines)
for line in lines:
print line.rstrip()
#-----------------------------
[править] Reading a Particular Line in a File
# using efficient caching system
import linecache
linecache.getline(filename, DESIRED_LINE_NUMBER)
# or doing it more oldskool
lineno = 0
while True:
line = infile.readline()
if not line or lineno == DESIRED_LINE_NUMBER:
break
lineno += 1
#-----------------------------
lines = infile.readlines()
line = lines[DESIRED_LINE_NUMBER]
#-----------------------------
for i in range(DESIRED_LINE_NUMBER):
line = infile.readline()
if not line:
break
#-----------------------------
## Not sure what this thing is doing. Allow fast access to a given
## line number?
# usage: build_index(*DATA_HANDLE, *INDEX_HANDLE)
[править] Processing Variable-Length Text Fields
# given $RECORD with field separated by PATTERN,
# extract @FIELDS.
fields = re.split(pattern_string, text)
#-----------------------------
pat = re.compile(pattern_string)
fields = pat.split(text)
#-----------------------------
re.split(r"([+-])", "3+5-2")
#-----------------------------
[3, '+', 5, '-', 2]
#-----------------------------
fields = record.split(":")
#-----------------------------
fields = re.split(r":", record)
#-----------------------------
fields = re.split(r"\s+", record)
#-----------------------------
fields = record.split(" ")
#-----------------------------
[править] Removing the Last Line of a File
myfile = open(filename, "r")
prev_pos = pos = 0
while True:
line = myfile.readline()
if not line:
break
prev_pos = pos
pos = myfile.tell()
myfile = open(filename, "a")
myfile.truncate(prev_pos)
#-----------------------------
[править] Processing Binary Files
open(filename, "rb")
open(filename, "wb")
#-----------------------------
gifname = "picture.gif"
gif_file = open(gifname, "rb")
# Don't think there's an equivalent for these in Python
#binmode(GIF); # now DOS won't mangle binary input from GIF
#binmode(STDOUT); # now DOS won't mangle binary output to STDOUT
#-----------------------------
while True:
buff = gif.read(8 * 2**10)
if not buff:
break
sys.stdout.write(buff)
#-----------------------------
[править] Using Random-Access I/O
address = recsize * recno
myfile.seek(address, 0)
buffer = myfile.read(recsize)
#-----------------------------
address = recsize * (recno-1)
#-----------------------------
[править] Updating a Random-Access File
import posixfile
address = recsize * recno
myfile.seek(address)
buffer = myfile.read(recsize)
# ... work with the buffer, then turn it back into a string and ...
myfile.seek(-recsize, posixfile.SEEK_CUR)
myfile.write(buffer)
myfile.close()
#-----------------------------
## Not yet implemented
# weekearly -- set someone's login date back a week
# @@INCOMPLETE@@
[править] Reading a String from a Binary File
## Note: this isn't optimal -- the 's+=c' may go O(N**2) so don't
## use for large strings.
myfile.seek(addr)
s = ""
while True:
c = myfile.read(1)
if not c or c == "\0":
break
s += c
#-----------------------------
myfile.seek(addr)
offset = 0
while True:
s = myfile.read(1000)
x = s.find("\0")
if x != -1:
offset += x
break
offset += len(s)
if len(s) != 1000: # EOF
break
myfile.seek(addr)
s = myfile.read(offset - 1)
myfile.read(1)
#-----------------------------
## Not Implemented
# bgets - get a string from an address in a binary file
#-----------------------------
#!/usr/bin/perl
# strings - pull strings out of a binary file
import re, sys
## Assumes SepReader from above
pat = re.compile(r"([\040-\176\s]{4,})")
for block in SepReader(sys.stdin, "\0"):
pos = 0
while True:
match = pat.search(block, pos)
if not match:
break
print match.group(1)
pos = match.end(1)
#-----------------------------
[править] Reading Fixed-Length Records
# RECORDSIZE is the length of a record, in bytes.
# TEMPLATE is the unpack template for the record
# FILE is the file to read from
# FIELDS is a tuple, one element per field
import struct
RECORDSIZE= struct.calcsize(TEMPLATE)
while True:
record = FILE.read(RECORDSIZE):
if len(record)!=RECORDSIZE:
raise "short read"
FIELDS = struct.unpack(TEMPLATE, record)
# ----
[править] Reading Configuration Files
# NOTE: to parse INI file, see the stanard ConfigParser module.
import re
pat = re.compile(r"\s*=\s*")
for line in config_file:
if "#" in line: # no comments
line = line[:line.index("#")]
line = line.strip() # no leading or trailing white
if not line: # anything left?
continue
m = pat.search(line)
var = line[:m.start()]
value = line[m.end():]
User_Preferences[var] = value
[править] Testing a File for Trustworthiness
import os
mode, ino, dev, nlink, uid, gid, size, \
atime, mtime, ctime = os.stat(filename)
mode &= 07777 # discard file type info
#-----------------------------
info = os.stat(filename)
if info.st_uid == 0:
print "Superuser owns", filename
if info.st_atime > info.st_mtime:
print filename, "has been read since it was written."
#-----------------------------
import os
def is_safe(path):
info = os.stat(path)
# owner neither superuser nor me
# the real uid is in stored in the $< variable
if info.st_uid not in (0, os.getuid()):
return False
# check whether group or other can write file.
# use 066 to detect either reading or writing
if info.st_mode & 022: # someone else can write this
if not os.path.isdir(path): # non-directories aren't safe
return False
# but directories with the sticky bit (01000) are
if not (info.st_mode & 01000):
return False
return True
#-----------------------------
## XXX What is '_PC_CHOWN_RESTRICTED'?
def is_verysafe(path):
terms = []
while True:
path, ending = os.path.split(path)
if not ending:
break
terms.insert(0, ending)
for term in terms:
path = os.path.join(path, term)
if not is_safe(path):
return False
return True
#-----------------------------
# Program: tctee
# Not Implemented (requires reimplementing Perl's builtin '>>', '|',
# etc. semantics)
[править] Program: tailwtmp
#!/usr/bin/python
# tailwtmp - watch for logins and logouts;
# uses linux utmp structure, from /usr/include/bits/utmp.h
# /* The structure describing an entry in the user accounting database. */
# struct utmp
# {
# short int ut_type; /* Type of login. */
# pid_t ut_pid; /* Process ID of login process. */
# char ut_line[UT_LINESIZE]; /* Devicename. */
# char ut_id[4]; /* Inittab ID. */
# char ut_user[UT_NAMESIZE]; /* Username. */
# char ut_host[UT_HOSTSIZE]; /* Hostname for remote login. */
# struct exit_status ut_exit; /* Exit status of a process marked
# as DEAD_PROCESS. */
# long int ut_session; /* Session ID, used for windowing. */
# struct timeval ut_tv; /* Time entry was made. */
# int32_t ut_addr_v6[4]; /* Internet address of remote host. */
# char __unused[20]; /* Reserved for future use. */
# };
# /* Values for the `ut_type' field of a `struct utmp'. */
# #define EMPTY 0 /* No valid user accounting information. */
#
# #define RUN_LVL 1 /* The system's runlevel. */
# #define BOOT_TIME 2 /* Time of system boot. */
# #define NEW_TIME 3 /* Time after system clock changed. */
# #define OLD_TIME 4 /* Time when system clock changed. */
#
# #define INIT_PROCESS 5 /* Process spawned by the init process. */
# #define LOGIN_PROCESS 6 /* Session leader of a logged in user. */
# #define USER_PROCESS 7 /* Normal process. */
# #define DEAD_PROCESS 8 /* Terminated process. */
#
# #define ACCOUNTING 9
import time
import struct
import os
class WTmpRecord:
fmt = "hI32s4s32s256siili4l20s";
_fieldnames = ["type","PID","Line","inittab","User","Hostname",
"exit_status", "session", "time", "addr" ]
def __init__(self):
self._rec_size = struct.calcsize(self.fmt)
def size(self):
return self._rec_size
def unpack(self, bin_data):
rec = struct.unpack(self.fmt, bin_data)
self._rec = []
for i in range(len(rec)):
if i in (2,3,4,5):
# remove character zeros from strings
self._rec.append( rec[i].split("\0")[0] )
else:
self._rec.append(rec[i])
return self._rec
def fieldnames(self):
return self._fieldnames
def __getattr__(self,name):
return self._rec[self._fieldnames.index(name)]
rec = WTmpRecord()
f = open("/var/log/wtmp","rb")
f.seek(0,2)
while True:
while True:
bin = f.read(rec.size())
if len(bin) != rec.size():
break
rec.unpack(bin)
if rec.type != 0:
print " %1d %-8s %-12s %-24s %-20s %5d %08x" % \
(rec.type, rec.User, rec.Line,
time.strftime("%a %Y-%m-%d %H:%M:%S",time.localtime(rec.time)),
rec.Hostname, rec.PID, rec.addr)
time.sleep(1)
f.close()
[править] Program: tctee
# @@INCOMPLETE@@
# @@INCOMPLETE@@
[править] Program: laston
#!/usr/bin/python
# laston - find out when given user last logged on
import sys
import struct
import pwd
import time
import re
f = open("/var/log/lastlog","rb")
fmt = "L32s256s"
rec_size = struct.calcsize(fmt)
for user in sys.argv[1:]:
if re.match(r"^\d+$", user):
user_id = int(user)
else:
try:
user_id = pwd.getpwnam(user)[2]
except:
print "no such uid %s" % (user)
continue
f.seek(rec_size * user_id)
bin = f.read(rec_size)
if len(bin) == rec_size:
data = struct.unpack(fmt, bin)
if data[0]:
logged_in = "at %s" % (time.strftime("%a %H:%M:%S %Y-%m-%d",
time.localtime(data[0])))
line = " on %s" % (data[1])
host = " from %s" % (data[2])
else:
logged_in = "never logged in"
line = ""
host = ""
print "%-8s UID %5d %s%s%s" % (user, user_id, logged_in, line, host)
else:
print "Read failed."
f.close()
|